<?php

namespace think\queue\connector;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\Rabbitmq as RabbitmqJob;

class Rabbitmq extends Connector
{

    protected $AMQPConnection;
    protected $channel;
    protected $default;

    //配置参数
    protected $options = [
        'queue' => 'default',
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'guest',
        'password' => '',
        'vhost' => '/',
    ];

    /**
     * @param array $options
     */
    public function __construct(array $options)
    {
        if (!empty($options)) $this->options = array_merge($this->options, $options);
        $this->default = $this->options['queue']; //默认队列
        $this->AMQPConnection = new AMQPStreamConnection(
            $this->options['host'],
            $this->options['port'],
            $this->options['user'],
            $this->options['password'],
            $this->options['vhost']
        );
        $this->channel = $this->AMQPConnection->channel();
    }

    /**
     * 绑定队列
     * @param $queue
     * @return array
     */
    protected function queueBind($queue)
    {
        $queueName = $this->getQueueName($queue); //队列名
        $exchangeName = 'delayed.' . $queueName; //交换机名
        // 声明交换机
        $this->channel->exchange_declare(
            $exchangeName,
            'x-delayed-message',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(["x-delayed-type" => 'direct'])
        );
        // 声明队列
        $this->channel->queue_declare($queueName, false, true, false, false);
        // 绑定交换机队列
        $this->channel->queue_bind($queueName, $exchangeName);
        return [$queueName, $exchangeName];
    }

    /**
     * 获取队列消息数量
     * @param $queue
     * @return int
     */
    public function size($queue = null)
    {
        [$queueName, ] = $this->queueBind($queue);
        $msg = $this->channel->basic_get($queueName, false);
        $size = 0;
        if ($msg) $size = $msg->getMessageCount() + 1;
        return $size;
    }

    /**
     * 推送队列消息
     * @param $job
     * @param $data
     * @param $queue
     * @return mixed|null
     * @throws \Exception
     */
    public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $data), $queue);
    }

    /**
     * 延时推送队列
     * @param $delay
     * @param $job
     * @param $data
     * @param $queue
     * @return mixed|null
     * @throws \Exception
     */
    public function later($delay, $job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $data), $queue, ['delay' => $delay]);
    }

    /**
     * 推送消息
     * @param $payload
     * @param $queue
     * @param array $options
     * @return mixed|null
     * @throws \Exception
     */
    public function pushRaw($payload, $queue = null, array $options = [])
    {
        //绑定队列
        [, $exchangeName] = $this->queueBind($queue);
        // 发送消息
        $properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
        //延时
        if (isset($options['delay'])) {
            $headers['x-delay'] = (int)$options['delay'] * 1000;
            $properties['application_headers'] = new AMQPTable($headers);
        }
        $msg = new AMQPMessage($payload, $properties);
        $this->channel->basic_publish($msg, $exchangeName);
        return json_decode($payload, true)['id'] ?? null;
    }

    /**
     * 取出消息
     * @param $queue
     * @return RabbitmqJob|void
     */
    public function pop($queue = null)
    {
        //绑定队列
        [$queueName, ] = $this->queueBind($queue);
        //取出队列消息
        $message = $this->channel->basic_get($queueName, false);
        if ($message) {
            return new RabbitMQJob($this, $this->channel, $message, $queueName);
        }
    }

    /**
     * payload数据补充id/attempts
     * @param $job
     * @param $data
     * @return array
     */
    protected function createPayloadArray($job, $data = '')
    {
        return array_merge(parent::createPayloadArray($job, $data), [
            'id' => $this->getRandomId(),
            'attempts' => 0
        ]);
    }

    /**
     * 随机id
     *
     * @return string
     */
    protected function getRandomId()
    {
        return Str::random(32);
    }

    /**
     * 获取队列名称
     * @param $queueName
     * @return mixed
     */
    protected function getQueueName($queueName = null)
    {
        return $queueName ?: $this->default;
    }

}
